/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.test.recovery;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.TestActorRef;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.util.Collection;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

/**
 * Tests recovery of {@link SubmittedJobGraph} instances.
 */
public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {

	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);

	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);

	@Rule
	public TemporaryFolder tempFolder = new TemporaryFolder();

	@AfterClass
	public static void tearDown() throws Exception {
		ZooKeeper.shutdown();
	}

	@Before
	public void cleanUp() throws Exception {
		ZooKeeper.deleteAll();
	}

	// ---------------------------------------------------------------------------------------------

	/**
	 * Tests that the HA job is not cleaned up when the jobmanager is stopped.
	 */
	@Test
	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());

		// Configure the cluster
		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

		TestingCluster flink = new TestingCluster(config, false, false);

		try {
			final Deadline deadline = TestTimeOut.fromNow();

			// Start the JobManager and TaskManager
			flink.start(true);

			JobGraph jobGraph = createBlockingJobGraph();

			// Set restart strategy to guard against shut down races.
			// If the TM fails before the JM, it might happen that the
			// Job is failed, leading to state removal.
			ExecutionConfig ec = new ExecutionConfig();
			ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100));
			jobGraph.setExecutionConfig(ec);

			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());

			// Submit the job
			jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));

			// Wait for the job to start
			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
					jobManager, deadline.timeLeft());
		}
		finally {
			flink.shutdown();
		}

		// verify that the persisted job data has not been removed from ZooKeeper when the JM has
		// been shutdown
		verifyRecoveryState(config);
	}

	/**
	 * Tests that clients receive updates after recovery by a new leader.
	 */
	@Test
	public void testClientNonDetachedListeningBehaviour() throws Exception {
		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
				ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());

		// Test actor system
		ActorSystem testSystem = null;

		// JobManager setup. Start the job managers as separate processes in order to not run the
		// actors postStop, which cleans up all running jobs.
		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];

		LeaderRetrievalService leaderRetrievalService = null;

		ActorSystem taskManagerSystem = null;

		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
			config,
			TestingUtils.defaultExecutor(),
			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);

		try {
			final Deadline deadline = TestTimeOut.fromNow();

			// Test actor system
			testSystem = AkkaUtils.createActorSystem(new Configuration(),
					new Some<>(new Tuple2<String, Object>("localhost", 0)));

			// The job managers
			jobManagerProcess[0] = new JobManagerProcess(0, config);
			jobManagerProcess[1] = new JobManagerProcess(1, config);

			jobManagerProcess[0].startProcess();
			jobManagerProcess[1].startProcess();

			// Leader listener
			TestingListener leaderListener = new TestingListener();
			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
			leaderRetrievalService.start(leaderListener);

			// The task manager
			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
			TaskManager.startTaskManagerComponentsAndActor(
				config,
				ResourceID.generate(),
				taskManagerSystem,
				highAvailabilityServices,
				"localhost",
				Option.<String>empty(),
				false,
				TaskManager.class);

			// Client test actor
			TestActorRef<RecordingTestClient> clientRef = TestActorRef.create(
					testSystem, Props.create(RecordingTestClient.class));

			JobGraph jobGraph = createBlockingJobGraph();

			{
				// Initial submission
				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());

				String leaderAddress = leaderListener.getAddress();
				UUID leaderId = leaderListener.getLeaderSessionID();

				// The client
				AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId);

				// Get the leader ref
				ActorRef leaderRef = AkkaUtils.getActorRef(
						leaderAddress, testSystem, deadline.timeLeft());
				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);

				int numSlots = 0;
				while (numSlots == 0) {
					Future<?> slotsFuture = leader.ask(JobManagerMessages
							.getRequestTotalNumberOfSlots(), deadline.timeLeft());

					numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft());
				}

				// Submit the job in non-detached mode
				leader.tell(new SubmitJob(jobGraph,
						ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);

				JobManagerActorTestUtils.waitForJobStatus(
						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
			}

			// Who's the boss?
			JobManagerProcess leadingJobManagerProcess;
			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
				leadingJobManagerProcess = jobManagerProcess[0];
			}
			else {
				leadingJobManagerProcess = jobManagerProcess[1];
			}

			// Kill the leading job manager process
			leadingJobManagerProcess.destroy();

			{
				// Recovery by the standby JobManager
				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());

				String leaderAddress = leaderListener.getAddress();
				UUID leaderId = leaderListener.getLeaderSessionID();

				ActorRef leaderRef = AkkaUtils.getActorRef(
						leaderAddress, testSystem, deadline.timeLeft());
				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);

				JobManagerActorTestUtils.waitForJobStatus(
						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());

				// Cancel the job
				leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
			}

			// Wait for the execution result
			clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis());

			int jobSubmitSuccessMessages = 0;
			for (Object msg : clientRef.underlyingActor().getMessages()) {
				if (msg instanceof JobManagerMessages.JobSubmitSuccess) {
					jobSubmitSuccessMessages++;
				}
			}

			// At least two submissions should be ack-ed (initial and recovery). This is quite
			// conservative, but it is still possible that these messages are overtaken by the
			// final message.
			assertEquals(2, jobSubmitSuccessMessages);
		}
		catch (Throwable t) {
			// Print early (in some situations the process logs get too big
			// for Travis and the root problem is not shown)
			t.printStackTrace();

			// In case of an error, print the job manager process logs.
			if (jobManagerProcess[0] != null) {
				jobManagerProcess[0].printProcessLog();
			}

			if (jobManagerProcess[1] != null) {
				jobManagerProcess[1].printProcessLog();
			}

			throw t;
		}
		finally {
			if (jobManagerProcess[0] != null) {
				jobManagerProcess[0].destroy();
			}

			if (jobManagerProcess[1] != null) {
				jobManagerProcess[1].destroy();
			}

			if (leaderRetrievalService != null) {
				leaderRetrievalService.stop();
			}

			if (taskManagerSystem != null) {
				taskManagerSystem.shutdown();
			}

			if (testSystem != null) {
				testSystem.shutdown();
			}

			highAvailabilityServices.closeAndCleanupAllData();
		}
	}

	/**
	 * Simple recording client.
	 */
	private static class RecordingTestClient extends UntypedActor {

		private final Queue<Object> messages = new ConcurrentLinkedQueue<>();

		private CountDownLatch jobResultLatch = new CountDownLatch(1);

		@Override
		public void onReceive(Object message) throws Exception {
			if (message instanceof LeaderSessionMessage) {
				message = ((LeaderSessionMessage) message).message();
			}

			messages.add(message);

			// Check for job result
			if (message instanceof JobManagerMessages.JobResultFailure ||
					message instanceof JobManagerMessages.JobResultSuccess) {

				jobResultLatch.countDown();
			}
		}

		public Queue<Object> getMessages() {
			return messages;
		}

		public void awaitJobResult(long timeout) throws InterruptedException {
			jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
		}
	}

	// ---------------------------------------------------------------------------------------------

	/**
	 * Creates a simple blocking JobGraph.
	 */
	private static JobGraph createBlockingJobGraph() {
		JobGraph jobGraph = new JobGraph("Blocking program");

		JobVertex jobVertex = new JobVertex("Blocking Vertex");
		jobVertex.setInvokableClass(BlockingNoOpInvokable.class);

		jobGraph.addVertex(jobVertex);

		return jobGraph;
	}

	/**
	 * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
	 */
	private void verifyCleanRecoveryState(Configuration config) throws Exception {
		// File state backend empty
		Collection<File> stateHandles = FileUtils.listFiles(
				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);

		if (!stateHandles.isEmpty()) {
			fail("File state backend is not clean: " + stateHandles);
		}

		// ZooKeeper
		String currentJobsPath = config.getString(
				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);

		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);

		if (stat.getCversion() == 0) {
			// Sanity check: verify that some changes have been performed
			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
					"this test. What are you testing?");
		}

		if (stat.getNumChildren() != 0) {
			// Is everything clean again?
			fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " +
					ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
		}
	}

	/**
	 * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
	 */
	private void verifyRecoveryState(Configuration config) throws Exception {
		// File state backend empty
		Collection<File> stateHandles = FileUtils.listFiles(
				tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);

		if (stateHandles.isEmpty()) {
			fail("File state backend has been cleaned: " + stateHandles);
		}

		// ZooKeeper
		String currentJobsPath = config.getString(
			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);

		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);

		if (stat.getCversion() == 0) {
			// Sanity check: verify that some changes have been performed
			fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " +
				"this test. What are you testing?");
		}

		if (stat.getNumChildren() == 0) {
			// Children have been cleaned up?
			fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " +
				ZooKeeper.getClient().getChildren().forPath(currentJobsPath));
		}
	}

}
